package day04;

import day03.window.FlinkWindow00;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.IOException;

/**
 * Flink 状态管理 - 状态后端
 *
 * @author lvbingbing
 * @date 2022-01-05 23:24
 */
public class FlinkState04 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、学习状态后端
        studyStateBackend(flinkWindow);
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 状态后端
     *
     * @param flinkWindow <br>
     */
    private static void studyStateBackend(FlinkWindow00 flinkWindow) throws IOException {
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 1、设置内存状态后端
        StateBackend memoryStateBackend = new MemoryStateBackend();
        env.setStateBackend(memoryStateBackend);
        // 2、设置文件系统状态后端
        String checkpointDataUri = "hdfs://hadoop102:9000/flink/stateBackend/checkpoint/";
        StateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
        env.setStateBackend(fsStateBackend);
        // 3、设置rocksDB状态后端
        StateBackend rocksDbStateBackend = new RocksDBStateBackend(checkpointDataUri);
        env.setStateBackend(rocksDbStateBackend);
    }
}




















